在Spring项目中以多线程的方式并发执行,异步处理任务。解决统计、累加类业务的例子。

您所在的位置:网站首页 spring boot线程 在Spring项目中以多线程的方式并发执行,异步处理任务。解决统计、累加类业务的例子。

在Spring项目中以多线程的方式并发执行,异步处理任务。解决统计、累加类业务的例子。

2024-01-12 20:58| 来源: 网络整理| 查看: 265

业务描述:

其实具体业务无所谓,这次解决的问题是“统计、累加类业务类型”,这里的业务就用”统计动物园中所有种类动物数量的总和”,类比代替了。

我要写一个接口,吐出 “动物园所有种类动物的总和”。已知目前有 15种动物,现在有现成的查询每种动物数量的接口,每种动物都要调用RPC接口去别的系统查询。且耗时较高。

工作方案:

根据上面的描述,线性去查询,调用15次RPC接口,时间花费巨大,所以放弃单线程模式。打算使用多线程的方法,进来请求后,分发 15个线程去查每一种动物的数据,返回结果。用多线程的话,在项目中肯定首先考虑使用线程池。

具体实现 (线程池 + 线程 + CountDownLatch ):1、配置线程池

ThreadPoolTaskExecutor 是Spring 对JUC包内的ThreadPoolExecutor上的封装,能配置Bean,注入SpringIOC 容器中,交给Spring管理

或者springBoot:

@Configuration @EnableAsync public class AsyncConfig { public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor"; @Bean(name = ASYNC_EXECUTOR_NAME) public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setTaskDecorator(new CopyingDecorator()); executor.setCorePoolSize(5); executor.setMaxPoolSize(20); executor.setQueueCapacity(1000); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setThreadNamePrefix("AsyncThread-"); executor.initialize(); return executor; }

2、Service 实现 /** * 查询数量使用的线程池 */ @Autowired @Qualifier("threadPool") private ThreadPoolTaskExecutor threadPool; public long getAllAnimalCount(int accountType, String account) { try { // 初始化返回结果 AtomicLong resultValue = new AtomicLong(0); // 获取所有的动物类型 AllTypeEnum[] enumValues = AllTypeEnum.values(); // 开启倒计时协调器 CountDownLatch countDownLatch = new CountDownLatch(enumValues .length); // 用线程池分发线程分配处理每一个类型 for (AllTypeEnum tempEnum : enumValues ) { threadPool.execute(new AnimalCountThread(account, accountType, tempEnum.getType(), resultValue, countDownLatch)); } // 等所有线程都处理完之后再拿返回结果 countDownLatch.await(); return resultValue.get(); } catch (InterruptedException e1) { log.error("出现线程中断异常", e1); } catch (Exception e2) { log.error("出现未知异常", e2); } return 0; }3、线程的定义/** * 查询动物数量线程 * * @author XXX * @date 2020/05/14 */ @Data @Slf4j public class AnimalCountThread implements Runnable { /** * 账号 */ private String account; /** * 账户类型 */ private int accountType; /** * 动物类型 来自枚举 */ private int type; /** * 累加的目标值 */ private AtomicLong targetValue; /** * 栅栏 */ private CountDownLatch countDownLatch; /** * 构造函数 * * @param account 账号 * @param accountType 账号类型 * @param type 动物类型 * @param targetValue 累加的目标值 * @param countDownLatch 栅栏 */ public AnimalCountThread (String account, int accountType, int type, AtomicLong targetValue, CountDownLatch countDownLatch) { this.account = account; this.accountType = accountType; this.type = type; this.targetValue = targetValue; this.countDownLatch = countDownLatch; } @Override public void run() { try { AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType()); if (typeEnum != null) { //获取具体业务Bean CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl"); long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType()); targetValue.getAndAdd(num); } }catch (Exception e) { log.error("线程执行出现异常",e); } finally { countDownLatch.countDown(); } } }总结 : 

1、线程中是无法直接使用注解注入JavaBean的,所以我从Spring容器里拿的。或者也可以不定义这个线程,使用匿名内部类的方法。

2、累计的目标值,直接使用 AtomicLong  省得自己去同步。

3、用CountDownLatch 等所有线程都处理完,主线程再拿返回结果。

4、CountDownLatch 在子线程中,一定要保证被调用到 countDown()。

5、线程池配置拒绝策略,另外三种都丢弃了任务,所以用交给主线程的这种方法比较适合当前业务。

6、线程池的配置队列长度:要是追求性能的话不能过长。越长耗时越长,接口性能越差。

7、接口最外层要合理使用缓存,缓解压力,在对外RPC接口出还可以配置限流。 由于运用了多线程,快进快出, 限流是为了减小峰值。快进快出的话即使限流。 吞吐量也会比不用“多线程”大。

8、一定要压测一下,对于线程池的配置,也可以根据压测结果,调配。

5月22日补充:

上面的实现方式,由于线程实例是实现Runable接口的方式,Runable run() 方法没有返回值的原因,所以用了公共的参数,AtomicLong  在线程内部累计计算的结果。而且用了CountDownLatch 进行同步操作,来保证主线程获取结果时,所有子任务处理完毕。

如果我们用其他方式时可以不用这两步。

先说线程池 +Callable + Future的方式。

一、Callable接口是jdk 1.4 以后提供的,能返回值,并且能抛异常。 public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } Callable一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本: Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); 因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。 二、Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

线程池 +Callable + Future的方式1、线程池的配置

同上

2、Service 实现 /** * 查询数量使用的线程池 */ @Autowired @Qualifier("threadPool") private ThreadPoolTaskExecutor threadPool; @Override public long getAllAnimalCount(int accountType, String account) { try { // 初始化返回结果 Long resultValue = 0L; // 获取所有的预警类型 AllTypeEnum[] enumValues = AllTypeEnum.values(); // 初始化,Future结果容器 List futureList = new ArrayList(); // 分发任务 for (AllTypeEnum tempEnum : enumValues) { Future tempResult = threadPool.submit(new AnimalCountTask (account, accountType, tempEnum.getType())); futureList.add(tempResult); } // 获取所有结果 for (Future tempFuture : futureList) { try { if (tempFuture.get() != null) { resultValue += tempFuture.get(); // 会阻塞,直到这个任务执行完毕。 } } catch (Exception e) { log.error("getAllAnimalCount,线程执行出现异常", e); } } return resultValue; } catch (Exception e) { log.error("[getAllAnimalCount]出现异常", e); } return 0; }3、Task 任务/** * 查询动物数量任务 Callable版本 * * @author XXX * @date 2020/05/14 */ @Data @Slf4j public class AnimalCountTask implements Callable { /** * 账号 */ private String account; /** * 账户类型 */ private int accountType; /** * 动物类型 */ private int animalType; public AnimalCountTask(String account, int accountType, int animalType) { this.account = account; this.accountType = accountType; this.animalType= animalType; } @Override public Long call() throws Exception { AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType()); if (typeEnum != null) { //获取具体业务Bean CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl"); long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType()); return num; } } return null; } }

这种方式的实现,可以看到 获取结果  resultValue += tempFuture.get(); 时会阻塞。循环获取的时候,假如你第二个任务用时最长,那他在for循环的第二次时候,等半天才接着处理其他的。 

这个问题呢,可以优化。我想哪个子任务先做完,我就先获取那个子任务的结果,而不是傻傻的线性的一个任务一个任务的看。

JDK 8 提供了 CompletionService   具有这样的功能。它的实现类内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。这个类是为线程池中Task的执行结果服务的,即为Executor中Task返回Future而服务的。 CompletionService的实现目标是“任务先完成可优先获取到,按完成先后顺序排序” public interface CompletionService { // 提交 Future submit(Callable task); Future submit(Runnable task, V result); // 获取 Future take() throws InterruptedException; Future poll(); Future poll(long timeout, TimeUnit unit) throws InterruptedException; }

Future submit(Callable task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;Future submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;Future take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;Future poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;Future poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;线程池 +Callable + ExecutorCompletionService 的方式:1、线程池的配置

同上

2、Service实现 /** * 查询数量使用的线程池 */ @Autowired @Qualifier("threadPool") private ThreadPoolTaskExecutor threadPool; @Override public long getAllAnimalCount(int accountType, String account) { try { // 初始化返回结果 Long resultValue = 0L; // 获取所有的动物类型 AllTypeEnum[] enumValues = AllTypeEnum.values(); // 实例化 CompletionService CompletionService completionService = new ExecutorCompletionService(threadPool); // 用CompletionService提交分发任务 for (AllTypeEnum tempEnum : enumValues) { completionService.submit(new AnimalCountTask(account, accountType, tempEnum.getType())); } // 拿取返回值并计算总和 for (AllTypeEnum tempEnum : enumValues ) { try { Long value = completionService.take().get();// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成; if (value != null) { resultValue += value; } } catch (Exception e) { log.error("[getAllAnimalCount]线程执行出现异常", e); } } return resultValue; } catch (Exception e) { log.error("[getAllAnimalCount]出现异常", e); } return 0; }3、Task 任务

同上Callable的实现

/** * 查询动物数量任务 Callable版本 * * @author XXX * @date 2020/05/14 */ @Data @Slf4j public class AnimalCountTask implements Callable { /** * 账号 */ private String account; /** * 账户类型 */ private int accountType; /** * 动物类型 */ private int animalType; public AnimalCountTask(String account, int accountType, int animalType) { this.account = account; this.accountType = accountType; this.animalType= animalType; } @Override public Long call() throws Exception { AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType()); if (typeEnum != null) { //获取具体业务Bean CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl"); long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType()); return num; } } return null; } }

说一下,Future.get()取结果时,为什么try catch异常。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3